{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "hMMdZ1vnxj8f"
   },
   "source": [
    "<center>\n",
    "    <p style=\"text-align:center\">\n",
    "        <img alt=\"phoenix logo\" src=\"https://storage.googleapis.com/arize-phoenix-assets/assets/phoenix-logo-light.svg\" width=\"200\"/>\n",
    "        <br>\n",
    "        <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
    "        |\n",
    "        <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
    "        |\n",
    "        <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
    "    </p>\n",
    "</center>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "se3M7GECJfpZ"
   },
   "source": [
    "# AutoGen Agents: Parallelization\n",
    "\n",
    "In this tutorial, we'll explore parallel task execution with [AutoGen](https://microsoft.github.io/autogen/stable//index.html) and how to trace the workflow using Phoenix.\n",
    "\n",
    "Parallelization is a powerful agent pattern where multiple tasks are run concurrently, significantly speeding up the overall process. Unlike purely sequential workflows, this approach is suitable when tasks are independent and can be processed simultaneously. Tracing allows us to monitor this concurrent flow and understand the timing of parallel branches.\n",
    "\n",
    "AutoGen doesn't have a built-in parallel execution manager, but its core agent capabilities integrate seamlessly with standard Python concurrency libraries. We can use these libraries to launch multiple agent interactions concurrently.\n",
    "\n",
    "By the end of this tutorial, you’ll learn how to:\n",
    "\n",
    "- Set up a basic AutoGen agent to perform specific tasks.\n",
    "\n",
    "- Execute a series of agent tasks in parallel using Python's threading library.\n",
    "\n",
    "- Use Phoenix to trace parallel agent interactions.\n",
    "\n",
    "- Compare parallelized and sequential agent workflows.\n",
    "\n",
    "⚠️ You'll need an OpenAI Key for this tutorial."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "G2j6_RvFJfpa"
   },
   "source": [
    "## Set up Keys and Dependencies\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install -qq pyautogen==0.9 autogen-agentchat~=0.2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install -qqq arize-phoenix arize-phoenix-otel openinference-instrumentation-openai"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from getpass import getpass\n",
    "\n",
    "import autogen\n",
    "\n",
    "if \"OPENAI_API_KEY\" not in os.environ:\n",
    "    os.environ[\"OPENAI_API_KEY\"] = getpass(\"🔑 Enter your OpenAI API key: \")\n",
    "\n",
    "if \"PHOENIX_API_KEY\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_API_KEY\"] = getpass(\"🔑 Enter your Phoenix API key: \")\n",
    "\n",
    "if \"PHOENIX_COLLECTOR_ENDPOINT\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = getpass(\"🔑 Enter your Phoenix Collector Endpoint\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ibmucjY_QkaH"
   },
   "source": [
    "## Configure Tracing\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from phoenix.otel import register\n",
    "\n",
    "tracer_provider = register(\n",
    "    project_name=\"autogen-agents\",\n",
    "    auto_instrument=True,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "kTVRWf0NJfpc"
   },
   "source": [
    "## Example Parallelization Task: Product Description Generator\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "qq5TvGKzO685"
   },
   "source": [
    "## Define Agent With Parallelization"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "wmWxWQSj-tM2"
   },
   "source": [
    "This section defines a parallelized workflow where multiple tasks are handled by a specialized `AssistantAgent`. Each task runs in its own thread, and tracing captures task-specific spans, including timing. A temporary `UserProxyAgent` is created for each thread to manage isolated chats with the agent.\n",
    "\n",
    "As each task completes, the generated content and task metadata are collected into a shared results list using a lock. This setup enables concurrent LLM interactions.\n",
    "\n",
    "In this example, we'll generate different components of a product description for a smartwatch (features, value proposition, target customer, tagline) by calling a marketing agent.\n",
    "\n",
    "![Diagram](https://storage.googleapis.com/arize-phoenix-assets/assets/images/arize_autogen_parallelization.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "dpjxM6BGLGW0"
   },
   "source": [
    "The `llm_config` specifies the configuration used for all the assistant agents.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "llm_config = {\n",
    "    \"config_list\": [\n",
    "        {\n",
    "            \"model\": \"gpt-4\",\n",
    "            \"api_key\": os.environ.get(\"OPENAI_API_KEY\"),\n",
    "        }\n",
    "    ]\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Specialized LLM AssistantAgent\n",
    "import threading\n",
    "\n",
    "marketing_writer_agent = autogen.AssistantAgent(\n",
    "    name=\"MarketingWriter\",\n",
    "    llm_config=llm_config,\n",
    "    system_message=\"You are a concise and persuasive marketing copywriter. Generate content based only on the specific prompt you receive. Keep your response focused on the requested information.\",\n",
    ")\n",
    "\n",
    "results = []\n",
    "results_lock = threading.Lock()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "import opentelemetry.context as context_api\n",
    "from opentelemetry import trace\n",
    "from opentelemetry.trace import Status, StatusCode\n",
    "\n",
    "tracer = tracer_provider.get_tracer(__name__)\n",
    "\n",
    "\n",
    "def run_agent_task_with_tracing(agent, task_prompt, task_id, parent_context):\n",
    "    token = context_api.attach(parent_context)\n",
    "    thread_name = threading.current_thread().name\n",
    "\n",
    "    with tracer.start_as_current_span(task_id, openinference_span_kind=\"agent\") as child_span:\n",
    "        start_time = time.time()\n",
    "\n",
    "        # Create a temp UserProxyAgent for current thread\n",
    "        temp_user_proxy = autogen.UserProxyAgent(\n",
    "            name=f\"UserProxy_{thread_name}\",\n",
    "            human_input_mode=\"NEVER\",\n",
    "            max_consecutive_auto_reply=1,\n",
    "            is_termination_msg=lambda x: True,\n",
    "            code_execution_config=False,\n",
    "        )\n",
    "\n",
    "        # Initiate the chat\n",
    "        temp_user_proxy.initiate_chat(\n",
    "            agent,\n",
    "            message=task_prompt,\n",
    "            clear_history=True,\n",
    "            silent=True,\n",
    "        )\n",
    "\n",
    "        # Get last message\n",
    "        assistant_reply = \"No reply found.\"\n",
    "        if temp_user_proxy.last_message(agent):\n",
    "            assistant_reply = temp_user_proxy.last_message(agent).get(\"content\", \"No reply found.\")\n",
    "        child_span.set_status(Status(StatusCode.OK))\n",
    "\n",
    "        end_time = time.time()\n",
    "        duration = end_time - start_time\n",
    "        print(f\"Thread {task_id}: Finished in {duration:.2f} seconds.\")\n",
    "\n",
    "        child_span.set_status(trace.StatusCode.OK)\n",
    "\n",
    "        results.append(\n",
    "            {\"task_id\": task_id, \"generated_content\": assistant_reply, \"duration\": duration}\n",
    "        )\n",
    "\n",
    "    context_api.detach(token)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "DsIAOzT47pW8"
   },
   "source": [
    "### Run and Trace Agent"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "BpTsxqn-_b5q"
   },
   "source": [
    "Next, we run our agent. After all threads complete, the results are collected, sorted based on the original task order, and printed out."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "start_total_time = time.time()\n",
    "\n",
    "results = []\n",
    "\n",
    "with tracer.start_as_current_span(\"Parallelized\", openinference_span_kind=\"agent\") as parent_span:\n",
    "    tasks = [\n",
    "        {\"id\": \"Features\", \"prompt\": \"List key features of the new SmartX Pro smartwatch...\"},\n",
    "        {\"id\": \"ValueProp\", \"prompt\": \"Describe the main value proposition of SmartX Pro...\"},\n",
    "        {\"id\": \"TargetCustomer\", \"prompt\": \"Describe the ideal customer profile for SmartX Pro...\"},\n",
    "        {\"id\": \"Tagline\", \"prompt\": \"Create 3 catchy marketing taglines for SmartX Pro.\"},\n",
    "    ]\n",
    "\n",
    "    parent_context_to_pass = context_api.get_current()\n",
    "\n",
    "    threads = []\n",
    "    for i, task_info in enumerate(tasks):\n",
    "        thread = threading.Thread(\n",
    "            target=run_agent_task_with_tracing,\n",
    "            args=(\n",
    "                marketing_writer_agent,\n",
    "                task_info[\"prompt\"],\n",
    "                task_info[\"id\"],\n",
    "                parent_context_to_pass,\n",
    "            ),\n",
    "            name=f\"{task_info['id']}\",\n",
    "        )\n",
    "        threads.append(thread)\n",
    "        thread.start()\n",
    "\n",
    "    for thread in threads:\n",
    "        thread.join()\n",
    "\n",
    "end_total_time = time.time()\n",
    "total_duration = end_total_time - start_total_time\n",
    "print(f\"\\nAll threads completed in {total_duration:.2f} seconds.\")\n",
    "\n",
    "# --- Print Results ---\n",
    "results.sort(key=lambda x: [t[\"id\"] for t in tasks].index(x[\"task_id\"]))\n",
    "print(\"--- Results ---\")\n",
    "for result in results:\n",
    "    print(f\"--- {result['task_id']} ---\")\n",
    "    print(f\"Generated Content:\\n{result['generated_content']}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "Byt4nzauod39"
   },
   "source": [
    "## Define Agent Without Parallelization (Latency Comparison)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "e1SY8I00_zqj"
   },
   "source": [
    "This section runs the same set of content generation tasks, but sequentially instead of in parallel. Each task is executed one after the other. This allows for a direct comparison of total execution time between sequential and parallel workflows."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "results = []\n",
    "\n",
    "\n",
    "def run_single_task(agent, task_prompt, task_id):\n",
    "    with tracer.start_as_current_span(f\"{task_id}\", openinference_span_kind=\"agent\") as span:\n",
    "        start_time = time.time()\n",
    "\n",
    "        # Create a temp UserProxyAgent\n",
    "        temp_user_proxy = autogen.UserProxyAgent(\n",
    "            name=\"UserProxy_Sequential\",\n",
    "            human_input_mode=\"NEVER\",\n",
    "            max_consecutive_auto_reply=1,\n",
    "            is_termination_msg=lambda x: True,\n",
    "            code_execution_config=False,\n",
    "        )\n",
    "\n",
    "        reply = \"No reply found.\"\n",
    "        temp_user_proxy.initiate_chat(\n",
    "            agent,\n",
    "            message=task_prompt,\n",
    "            clear_history=True,\n",
    "            silent=True,\n",
    "        )\n",
    "\n",
    "        # Get last message\n",
    "        if temp_user_proxy.last_message(agent):\n",
    "            reply = temp_user_proxy.last_message(agent).get(\"content\", \"No reply found.\")\n",
    "        span.set_status(Status(StatusCode.OK))\n",
    "\n",
    "        end_time = time.time()\n",
    "        duration = end_time - start_time\n",
    "        print(f\"Finished task '{task_id}' in {duration:.2f} seconds.\")\n",
    "\n",
    "        results.append({\"task_id\": task_id, \"prompt\": task_prompt, \"generated_content\": reply})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "avfCOs317rZy"
   },
   "source": [
    "### Run and Trace Sequential Agent"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "start_total_time = time.time()\n",
    "\n",
    "with tracer.start_as_current_span(\"Sequential\", openinference_span_kind=\"agent\") as parent_span:\n",
    "    # Define tasks\n",
    "    tasks = [\n",
    "        {\n",
    "            \"id\": \"Features\",\n",
    "            \"prompt\": \"List key features of the new SmartX Pro smartwatch. Be specific (e.g., screen type, battery life, sensors, connectivity).\",\n",
    "        },\n",
    "        {\n",
    "            \"id\": \"ValueProp\",\n",
    "            \"prompt\": \"Describe the main value proposition of SmartX Pro for busy professionals. Focus on how it solves their problems or improves their lives.\",\n",
    "        },\n",
    "        {\n",
    "            \"id\": \"TargetCustomer\",\n",
    "            \"prompt\": \"Describe the ideal customer profile for the SmartX Pro smartwatch. Include demographics, lifestyle, and needs.\",\n",
    "        },\n",
    "        {\n",
    "            \"id\": \"Tagline\",\n",
    "            \"prompt\": \"Create 3 catchy and distinct marketing taglines for SmartX Pro.\",\n",
    "        },\n",
    "    ]\n",
    "\n",
    "    for i, task_info in enumerate(tasks):\n",
    "        parent_span.add_event(f\"Starting task {i + 1}: {task_info['id']}\")\n",
    "        run_single_task(\n",
    "            agent=marketing_writer_agent, task_prompt=task_info[\"prompt\"], task_id=task_info[\"id\"]\n",
    "        )\n",
    "        parent_span.add_event(f\"Finished task {i + 1}: {task_info['id']}\")\n",
    "\n",
    "\n",
    "end_total_time = time.time()\n",
    "total_duration = end_total_time - start_total_time\n",
    "print(f\"\\nAll tasks completed sequentially in {total_duration:.2f} seconds.\")\n",
    "\n",
    "\n",
    "print(\"--- Results ---\")\n",
    "for result in results:\n",
    "    print(f\"--- {result['task_id']} ---\")\n",
    "    print(f\"Generated Content:\\n{result['generated_content']}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "pwbMnN9EfGWF"
   },
   "source": [
    "## View Results in Phoenix"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "qS3VEJ2K7x0Q"
   },
   "source": [
    "The tracing results below focus on the performance of the parallelized workflow. Compared to the parallelized agent, the sequential version takes 3–4 times longer to complete. In Phoenix, we can view a detailed breakdown of LLM inputs and outputs, along with metadata such as latency, token counts, and model configurations."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "errpoyM5-WvU"
   },
   "source": [
    "![Results](https://storage.googleapis.com/arize-phoenix-assets/assets/gifs/phoenix_autogen_parallelized.gif)"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
